#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#############################################################################
# Copyright (c) 2020 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
#          http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
# Description  : gs_upgradectl is a utility to upgrade a Gauss200 application.
# 
# gs_upgradectl is a upgrade framework,which control the upgrade process.
# it contains binary upgrade, in-place upgrade and on-line binary upgrade.
#
# binary upgrade: which includes stopping old cluster,
# replacing binary and starting
# new cluster,only used for no database objects changed between old cluster
# and new cluster.
#
# on-line binary upgrade: rolling upgrade, upgrade standby instances
# firstly, switch over,
# and then upgrade the master instances. only used for no database objects
# changed
# between old cluster and new cluster now.
#
# in-place upgrade: which includes binary upgrade and update database
# mete-data(system tables,
# system views, functions, and so on) ,used for some database objects had
# been changed
# between old cluster and new cluster.
#############################################################################

import os
import sys
import pwd
import grp
import copy
import re
import json
import subprocess

from gspylib.common.Common import DefaultValue
from gspylib.common.GaussLog import GaussLog
from gspylib.common.ParallelBaseOM import ParallelBaseOM
from gspylib.threads.SshTool import SshTool
from gspylib.common.ErrorCode import ErrorCode
from gspylib.common.ParameterParsecheck import Parameter
import impl.upgrade.UpgradeConst as Const
from impl.upgrade.OLAP.UpgradeImplOLAP import UpgradeImplOLAP
from impl.upgrade.upgrade_cm_impl import UpgradeCmImpl
from domain_utils.cluster_file.cluster_log import ClusterLog
from base_utils.os.env_util import EnvUtil
from base_utils.os.net_util import NetUtil
from domain_utils.domain_common.cluster_constants import ClusterConstants


class DualUpgradeShareInfo:
    """
    Used to record the upgrade status information of the primary and standby clusters

    """

    def __init__(self, jsonInfo=None):
        # If the Json string is passed in, the Json information is used to initialize the class
        if jsonInfo:
            self.__dict__ = jsonInfo
        else:
            self.masterVersion = ""
            self.masterUpgradeStatus = 0
            self.standbyVersion = ""
            self.standbyUpgradeStatus = 0


class Upgrade(ParallelBaseOM):
    """
    The class about upgrade
    """

    def __init__(self):
        ParallelBaseOM.__init__(self)
        self.oldClusterInfo = ""
        self.oldVersion = ""
        # the directory that when do binary upgrade the information store 
        self.upgradeBackupPath = ""
        self.userProfile = ""
        self.tmpDir = ""
        self.newClusterAppPath = ""
        self.oldClusterAppPath = ""
        self.clusterNodes = []
        self.nodesNum = -1
        self.nodeNames = []
        ##static parameter
        self.binTarName = "binary_%s.tar" % NetUtil.GetHostIpOrName()
        self.rollback = False
        self.is_inplace_upgrade = False
        self.is_grey_upgrade = True
        self.guc_paras = {}
        self.newClusterVersion = None
        self.newClusterNumber = None
        self.oldclusterVersion = None
        self.oldClusterNumber = None
        self.forceRollback = False
        self.upgrade_remain = False
        # Record the upgrade status information under dual clusters
        self.dualUpgradeShareInfo = None
        # Record the primary cluster or the standby cluster, dual-primary or dual-standby
        self.clusterType = ""
        # Whether it is a standby cluster in a dual cluster. Convenient to judge
        self.standbyCluster = False
        # The path to record the information of each cluster upgrade stage in the dual cluster
        self.upgradePhaseInfoPath = ""
        self.upgrade_action = ""
        self.upgrade_package = ""
        self.bypass_stale_check = True

        # check if upgrade from 3.0.* version with compress table.
        self.check_compresstbl_compat = True
        # check if exists mot table and need to modify guc param
        self.modifyMotParam = False
        self.skip_resource_check = False

    def usage(self):
        """
gs_upgradectl is a utility to upgrade a cluster.

Usage:
  gs_upgradectl -? | --help
  gs_upgradectl -V | --version
  gs_upgradectl -t chose-strategy [-l LOGFILE]
  gs_upgradectl -t auto-upgrade -X XMLFILE [-l LOGFILE] [--grey] [--inplace-upgrade] [--skip-resource-check]
  gs_upgradectl -t auto-rollback -X XMLFILE [-l LOGFILE] [--force]
  gs_upgradectl -t commit-upgrade -X XMLFILE [-l LOGFILE]
  gs_upgradectl -t upgrade-cm --upgrade-package PACKAGE_PATH
  gs_upgradectl -S show-step

General options:
  -?, --help                      Show help information for this utility,
                                  and exit the command line mode.
  -V, --version                   Show version information.
  -t                              Subcommand for upgrade. It can be
                                  chose-strategy, auto-upgrade, auto-rollback,
                                  commit-upgrade, upgrade-cm.
  -X                              Path of the XML configuration file of the
                                  later version cluster.
  -l                              Path of log file.
  --force                         Force to rollback when cluster status is
                                  not normal
  --bypass-stale-check            allows upgrading from later releases to earlier releases                                 
  --grey                          Use grey-binary-upgrade
  --upgrade-package               Path of the CM component package
  --inplace-upgrade               Use inplace-upgrade.
  --skip-resouce-check            Skip resource check.
Option for grey upgrade
  -h                              Under grey upgrade, specified nodes name.
  --continue                      Under grey upgrade, continue to upgrade
                                  remain nodes. 

        """

        print(self.usage.__doc__)

    def parseCommandLine(self):
        """
        Parse command line and save to global variable
        """
        # Resolves incoming parameters
        ParaObj = Parameter()
        ParaDict = ParaObj.ParameterCommandLine("upgradectl")
        if "helpFlag" in ParaDict.keys():
            self.usage()
            sys.exit(0)

        # get action information
        if "action" in ParaDict.keys():
            self.action = ParaDict.get("action")
            if self.action == "auto-upgrade" and "bypass_stale_check" not in ParaDict.keys():
                self.bypass_stale_check = False
        if "confFile" in ParaDict.keys():
            self.xmlFile = ParaDict.get("confFile")
        # get logFile information
        if "logFile" in ParaDict.keys():
            self.logFile = ParaDict.get("logFile")
        if "inplace_upgrade" in ParaDict.keys():
            print("We no longer maintain inplace-upgrade and use grey upgrade as default.")
        if "nodename" in ParaDict.keys():
            self.nodeNames = ParaDict.get("nodename")
        if "continue" in ParaDict.keys():
            self.upgrade_remain = True
        if "force" in ParaDict.keys():
            print("We no longer support force rollback. and '--force' is no longer in effect.")
        if "upgrade-package" in ParaDict.keys():
            self.upgrade_package = ParaDict.get("upgrade-package")
        if "skip_resource_check" in ParaDict.keys():
            self.skip_resource_check = True
        self.tmpDir = EnvUtil.getTmpDirFromEnv()
        if self.tmpDir == "":
            raise Exception(ErrorCode.GAUSS_518["GAUSS_51800"] % "$PGHOST")
        self.upgradePhaseInfoPath = os.path.join(self.tmpDir, Const.UPGRADE_PHASE_INFO)

    def checkUser(self):
        """
        function: check user
        """
        # check user
        # it will do more check about user after get the cluster config info
        # get user information
        self.user = pwd.getpwuid(os.getuid()).pw_name
        # get group information
        self.group = grp.getgrgid(pwd.getpwnam(self.user).pw_gid).gr_name
        # if the user or group is null, exit
        if self.user == "" or self.group == "":
            raise Exception(ErrorCode.GAUSS_503["GAUSS_50308"])
        # if the user or group is 'root', exit
        if self.user == "root" or self.group == "root":
            raise Exception(ErrorCode.GAUSS_501["GAUSS_50105"])

        # we must make sure the env 'GAUSSHOME', 'GS_CLUSTER_NAME',
        # 'GAUSS_ENV' exists
        if (EnvUtil.getEnvironmentParameterValue("GAUSSHOME",
                                                      self.user) == ""):
            raise Exception(ErrorCode.GAUSS_518["GAUSS_51800"] % "$GAUSSHOME")
        if (EnvUtil.getEnvironmentParameterValue("GS_CLUSTER_NAME",
                                                      self.user) == ""):
            raise Exception(
                ErrorCode.GAUSS_518["GAUSS_51800"] % "$GS_CLUSTER_NAME")
        if (EnvUtil.getEnvironmentParameterValue("GAUSS_ENV",
                                                      self.user) == ""):
            raise Exception(ErrorCode.GAUSS_518["GAUSS_51800"] % "$GAUSS_ENV")

        # depending on the environment variable GPHOME, access to the python 
        GPHOME = EnvUtil.getEnv(ClusterConstants.TOOL_PATH_ENV)
        if (GPHOME == None or GPHOME == ""):
            raise Exception(ErrorCode.GAUSS_518["GAUSS_51800"] % "$GPHOME")

    def checkParameter(self):
        """
        function: Check parameter from command line
        """
        if self.action == "":
            raise Exception(ErrorCode.GAUSS_500["GAUSS_50001"] % "t" + ".")
        # when we do auto-upgrade, auto-rollback or commit-upgrade,
        # we must incoming '-X' and make sure  the xml file exists.
        if self.action not in [Const.ACTION_CHOSE_STRATEGY, Const.ACTION_UPGRADE_CM, Const.ACTION_SHOW_STEP]:
            if self.xmlFile == "":
                raise Exception(ErrorCode.GAUSS_500["GAUSS_50001"] % 'X' + ".")
            if not os.path.exists(self.xmlFile):
                raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] %
                                self.xmlFile)
        
        # check parameter base on different action
        # check the param which input
        if self.action == Const.ACTION_AUTO_UPGRADE:
            if self.is_grey_upgrade:
                self.checkCommandConflicts(inplace=False)
            else:
                self.checkCommandConflicts()

        # check mpprc file path
        # get mpprcFile by MPPDB_ENV_SEPARATE_PATH. Even if the return value
        # is "" or None, no need to pay attention
        self.mpprcFile = EnvUtil.getEnv(DefaultValue.MPPRC_FILE_ENV)

        # make sure which env file we use
        # If self.mpprcFile is not "" and None, return self.mpprcFile; else
        # return '~/.bashrc'
        self.userProfile = EnvUtil.getMpprcFile()
        self.checkUser()

        # check log file
        if self.logFile == "":
            self.logFile = ClusterLog.getOMLogPath(
                ClusterConstants.UPGRADE_LOG_FILE, self.user, "", "")
        if not os.path.isabs(self.logFile):
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50213"] % "log")

        self.initLogger(self.action)

    def execCommandInSpecialNode(self, cmd, hosts, retry_times=2, time_out=0):
        if not hosts:
            host_list = copy.deepcopy(self.clusterNodes)
        else:
            host_list = copy.deepcopy(hosts)
        self.logger.debug("Commanded: exec cmd in the hosts {0}".format(host_list))
        failed_host = []
        fail_msg = ""
        retry = True
        count = 0
        status, output = dict(), dict()
        try:
            while retry:
                try:
                    (status, output) = self.sshTool.getSshStatusOutput(cmd, host_list)
                    for key, val in list(status.items()):
                        if DefaultValue.SUCCESS in val:
                            host_list.remove(key)
                    if host_list:
                        count += 1
                    if not host_list or count >= retry_times:
                        retry = False
                except Exception as _:
                    count += 1
                    retry = False if not host_list or count >= retry_times else True

            for key, val in list(status.items()):
                if DefaultValue.FAILURE in val:
                    failed_host.append(key)
                    fail_msg += val
        finally:
            if failed_host:
                local_file_msg = ""
                if self.localLog in cmd:
                    local_log = GaussLog(self.localLog)
                    local_file_msg = "\n For details about this error, see the file: {0} " \
                                     "on the host {1}".format(local_log.logFile, failed_host)
                replace_reg = re.compile(r'-W[ ]*[^ ]*[ ]*')
                cmd_hide_passwd = replace_reg.sub('-W *** ', cmd)
                raise Exception(ErrorCode.GAUSS_535["GAUSS_53507"] % cmd_hide_passwd + local_file_msg + output)

    def initGlobalInfos(self):
        """
        function: init global infos
        """
        self.logger.debug("Init global infos")
        
        # init cluster info
        if self.xmlFile:
            self.initClusterInfo()
        else:
            self.initClusterInfoFromStaticFile(self.user)
        # init clusterNodes
        for dbNode in self.clusterInfo.dbNodes:
            self.clusterNodes.append(dbNode.name)
        if len(self.clusterNodes) == 0:
            raise Exception(ErrorCode.GAUSS_512["GAUSS_51201"])

        # verify whether it is single cluster, isSingle means single node     
        if len(self.clusterNodes) == 1:
            self.isSingle = True

        for nodeName in self.nodeNames:
            if nodeName not in self.clusterNodes:
                raise Exception(ErrorCode.GAUSS_516["GAUSS_51619"] % nodeName)
        self.logger.debug("Successfully init global infos")

        # If it is a dual-cluster, initialize the related information of the dual-cluster
        self.initDualUpgradeInfo()

    def initDualUpgradeInfo(self):
        """
        initialize dual cluster upgrade status information
        If it is not a dual cluster, do not initialize
        :return:
        """
        if os.path.exists(self.upgradePhaseInfoPath):
            if self.is_inplace_upgrade and self.action not in \
                    ["commit-upgrade", "auto-rollback", "chose-strategy"]:
                raise Exception("Dual cluster does not support in-place upgrade")
            self.dualUpgradeShareInfo = self.getDualUpgradeInfo(self.upgradePhaseInfoPath,
                                                                startPost=0)
        if not self.dualUpgradeShareInfo:
            self.dualUpgradeShareInfo = DualUpgradeShareInfo()

    @staticmethod
    def getDualUpgradeInfo(filePath, startPost):
        """
        Obtain the dual-cluster upgrade status information from the file,
        and return None if there is no record
        :return:
        """
        if os.path.exists(filePath):
            lenInfo = 0
            with open(filePath, 'r') as shareInfo:
                shareInfo.seek(startPost)
                length = shareInfo.read(4)
                if length > '':
                    try:
                        lenInfo = int(length)
                    except Exception as _:
                        lenInfo = 0
                if lenInfo > 0:
                    shareInfo.seek(startPost + 4)
                    return json.loads(shareInfo.read(lenInfo), object_hook=DualUpgradeShareInfo)
        return None

    def updateDualUpgradeInfo(self, dualUpgradeShareInfo, filePath, startPost):
        """
        Update the upgrade information of the cluster to the dual-cluster
        shared file /dev/my_disk_sync_disk file
        :return:
        """
        if os.path.exists(filePath):
            with os.fdopen(os.open(filePath, os.O_WRONLY, 0o600), "w") as shareInfo:
                shareInfo.seek(startPost + Const.LENGTH_STORAGE_INFO_LEN)
                shareInfo.write(json.dumps(dualUpgradeShareInfo, default=lambda obj: obj.__dict__))
                length = shareInfo.tell() - (startPost + Const.LENGTH_STORAGE_INFO_LEN)
                shareInfo.seek(startPost, 0)
                shareInfo.write("{0:04d}".format(length))

            # After the status file is updated, the standby cluster
            # distributes the updated status file to the data directory of the DN.
            for dbNode in self.clusterInfo.dbNodes:
                for dnInst in dbNode.datanodes:
                    self.sshTool.scpFiles(filePath, dnInst.datadir,
                                          hostList=[dnInst.hostname])
        else:
            raise Exception("{0} file does not exist and cannot be updated".format(filePath))

    def distributeFileToSpecialNode(self, file, destDir, hostList):
        """
        distribute file to special node
        :param file:
        :param destDir:
        :param hostList:
        :return:
        """
        if not hostList:
            hostList = copy.deepcopy(self.clusterNodes)
        else:
            hostList = copy.deepcopy(hostList)
        if NetUtil.GetHostIpOrName() in hostList:
            hostList.remove(NetUtil.GetHostIpOrName())

        self.logger.debug("Start copy file:{0} to hosts:{1}.".format(
            file, hostList))
        if not os.path.exists(file):
            raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % file)
        self.logger.debug("Distribute the file %s" % file)
        retry = True
        count = 0
        while retry:
            try:
                if count > 4:
                    retry = False
                self.sshTool.scpFiles(file, destDir, hostList)
                retry = False
            except Exception as e:
                count += 1
                self.logger.debug("Retry distributing xml command, "
                                  "the {0} time.".format(count))

    def checkCommandConflicts(self, inplace=True):
        """
        function: check the command line for conflict input
        return:
        """
        conflictPara = 0
        if self.upgrade_remain:
            conflictPara += 1
        if len(self.nodeNames) != 0:
            conflictPara += 1
        if inplace:
            if conflictPara > 0:
                raise Exception("The parameter %s should be used in grey "
                                "upgrade" % "'--continue, -h'")
        else:
            if conflictPara > 1:
                GaussLog.exitWithError(ErrorCode.Gauss_500["GAUSS_50005"] % (
                                       "--continue", "-h"))

    def do_check_pre_upgrade(self):
        """
        do check pre upgrade
        """
        self.logger.log("Start check pre upgrade.")
        env_value = EnvUtil.getEnv("MPPDB_ENV_SEPARATE_PATH")
        if not env_value or not os.path.exists(env_value):
            env_value = "~/.bashrc"
        cmd = "source %s; gs_preupgradechk -i A" % env_value
        (status, output) = subprocess.getstatusoutput(cmd)
        if status != 0:
            raise Exception("Failed to check pre upgrade. Output: %s" % output)
        
        if "All items have been completed" in output:
            self.logger.logExit("Fiailed to check pre upgrade. the reason is: %s" % output.split('Error:')[1])
        else:
            self.logger.log("Successfully checked pre upgrade.")

    def check_pre_upgrade(self):
        if self.skip_resource_check:
            self.logger.log("Skip pre upgrade source check.")
            return

        self.do_check_pre_upgrade()
        
        
if __name__ == '__main__':
    """
    main function
    """
    if os.getuid() == 0:
        GaussLog.exitWithError(ErrorCode.GAUSS_501["GAUSS_50105"])

    try:
        REPEAT = False
        upgrade = Upgrade()
        upgrade.parseCommandLine()
        upgrade.checkParameter()

        # check resource before upgrade
        upgrade.check_pre_upgrade()

        # set action flag file
        DefaultValue.setActionFlagFile("gs_upgradectl")
        if upgrade.action == Const.ACTION_UPGRADE_CM:
            impl = UpgradeCmImpl(upgrade)
        else:
            upgrade.initGlobalInfos()
            impl = UpgradeImplOLAP(upgrade)
        impl.run()
    except Exception as e:
        if REPEAT:
            upgrade.sshTool = SshTool(upgrade.clusterNodes,
                                      DefaultValue.TIMEOUT_PSSH_COMMON)
        GaussLog.exitWithError(str(e))
    finally:
        DefaultValue.setActionFlagFile("gs_upgradectl",False)
